Problem:
SQL jest prosty. Funkcje okienkowe są proste, intuicyjne i przychodzi bez komplikacji. Natomiast w Spark? Jak wytłumaczyć koledze w zespole w jaki sposób tworzyć funkcję okienkowe w Spark? Od czego zacząć? Jakie są problemy i wyzwania w tworzeniu funkcji okienkowych? Jakie są różnic w porównaniu z SQL? Co jest łatwiejsze w utrzymaniu?
Rozwiązanie:
Na szybko można odpowiedzieć w ten sposób, że funkcja okienkowa tworzona jest w oderwaniu od data setu ale aplikowania na jego podzbiorze. Na przykład definicja wygląda w ten sposób:
user_window = Window.partitionBy("user_id").orderBy("event_time")
Jak to rozumieć? Dla każdego użytkownika okienko będzie posortowane po even_time i najstarsze dane będą pojawiały się na jako pierwsze a najświeższe dane wylądują w tym zbiorze danych jako ostatnie.
Potem wystarczy już tylko użyć funkcji okienkowej.
Ale zacznijmy po kolei i na przykładzie.
Funkcje okienkowe: Po co?
Po co są właściwie funkcje okienkowe? W jakich przypadkach się je stosuje?
Znasz zapewne funkcje grupujące? One działają na całym zbiorze dostępnych danych. Na całej tabeli lub dataframie.
Co jednak w przypadku, gdy w ramach tabeli chcesz przeliczyć sumy dla mniejszych podzbiorów. Klasyczny przykład to zliczenie sumy zarobków pracowników w departamencie.
Funkcje okienkowe są stworzone, do operacji które skupiają się na wycinku danych w ramach dużego zbioru.
Przykład
Dane na których będziemy pracować to tabela z użytkownikami odwiedzającymi witrynę internetową. Będziemy mieli dostępne identyfikator użytkownika, czas zaistnienia wydarzenia, typ wydarzenia i nazwa podstrony, która została odwiedzona.
Mniej więcej coś takiego:
create or replace table next_level_dm.d_user_sessions as
select 'USER001' as user_id,'2025-04-10 08:05:12' as event_time,'page_view' event_type,'/home' as page union all
select 'USER001','2025-04-10 08:06:45','page_view','/products' union all
select 'USER001','2025-04-10 08:09:11','click','/products/shoes' union all
select 'USER001','2025-04-10 08:45:23','page_view','/cart' union all
select 'USER001','2025-04-10 09:20:10','page_view','/home' union all
select 'USER001','2025-04-10 09:22:05','click','/checkout' union all
select 'USER001','2025-04-10 09:25:47','purchase','/thank-you' union all
select 'USER002','2025-04-10 10:15:30','page_view','/blog'
Zadanie do wykonania
Dla każdego użytkownika przypisz numer sesji. Nowa sesja zaczyna się, gdy czas między jednym a kolejnym odwiedzeniem strony jest większy niż 30 minut.
Zacznijmy od definiowania okna, czyli podzbioru danych, na których będziemy operować:
user_window = Window.partitionBy("user_id").orderBy("event_time")
Co tutaj się dzieje, zdefiniowaliśmy okno per użytkownik. Czyli wszystkie funkcje okienkowe będą wykonywane w tylko dla tak ograniczonego podzbioru danych w naszym przypadku w ramach użytkownika. Co jest też istotne ustaliliśmy sortowanie po czasie eventu. Czyli wydarzenia będą wyświetlane od najstarszego do najmłodszego.
Jak dobrze zauważyłeś, okno jest stworzone w oderwaniu od datasetu. Dopiero potem aplikowane jest dla konkretnej funkcji okienkowej w ramach datasetu.
Użycie funkcji okienkowej
W celu policzenia, czy od ostatniego eventu minęło więcej niż 30 minut, musimy znać czas wystąpienia poprzedniego eventu. Do tego celu użyjemy funkcji okienkowej lag - pobierającej poprzedni element.
df_prev_event = user_sessions.withColumn("prev_event_time_per_user", lag("event_time").over(user_window)))
Funkcja okienkowa lag pobiera poprzedni event_type dla okna zdefiniowanego w user_window. Czyli dla pierwszego rekordu będzie to wartość pusta (null) gdyż tam nie było jeszcze poprzedniego rekordu. Po dodaniu kolumny będziemy mieli taki zbiór danych:

Rozwiązujemy zadanie
W celu rozwiązania zadanie będziemy potrzebowali jeszcze.
- Policzyć ile czasu w minutach upłynęło pomiędzy wejściami na kolejne strony
- Sprawdzić czy różnica czasu jest większa od 30 minut, jeżeli tak ustawić flagę.
- Nadać kolejne numery sesjom użytkownika
Ad1:
withColumn(
"time_diff_minutes_per_user",
(unix_timestamp("event_time") - unix_timestamp("prev_event_time_per_user")) / 60
Pobranie poprzedniego event time dla użytkownika, jeżeli poprzedniego nie było wtedy zwracana jest wartość null.

Ad2:
withColumn("is_new_session", when(col("prev_event_time_per_user").isNull(), lit(1))
.when(col("time_diff_minutes_per_user") > 30, lit(1))
.otherwise(lit(0)))
Flaga wyliczona na podstawie wcześniej różnicy czasu.

Ad3.
withColumn("session_id", sum("is_new_session").over(user_window)
Tutaj znowu wykorzystujemy funkcję okienkową do zsumowania flag is_new_session. Po uruchomieniu tej funkcji dostaniemy taki wynik:

Spark kontra SQL
W Sparku okno definiujesz raz i raz przypisujesz do funkcji okienkowej. To jest zdecydowana zaleta, gdyż przy potencjalnych modyfikacjach zmiany okna, czyli albo zawężenia albo rozszerzenia lub zmianie warunków filtrowania, dokonujesz w je w jednym miejscu i nie musisz zmieniać kodu w wielu miejscach.
Takie definiowanie okna i wyzwalanie go w Sparku jest też intuicyjne. Różnicami jest to, że wystarczy okno zdefiniować raz i definiujemy je w oderwaniu od datasetu. Więc można tą samą funkcję okienkową używać w wielu datasetach.